package com.offcn.bigdata.streaming.p1

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * SparkStreaming
  * 和receiver
  * 在下面的案例中，我们将原来的local[*]改成了local，则出现警告：
  *     spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data,
  *     otherwise Spark jobs will not get resources to process the received data.
  *     在本地模式下面执行程序，如果是有receiver来接收数据，那么master中必须设置local[N],N要大于1。
  *     否则，也就意味着是local，为当前程序只分配一个工作线程，而该工作线程会被优先用来接收数据，自然就不会有多余的线程来执行计算。
  *
  *
  */
object _02StreamingWordCountApp {
    def main(args: Array[String]): Unit = {
        if(args == null || args.length != 2) {
            println(
                """
                  |Usage: <host> <port>
                """.stripMargin)
            System.exit(-1)
        }
        val Array(host, port) = args

        val conf = new SparkConf()
                    .setAppName("_02StreamingWordCountApp")
                    .setMaster("local[*]")
        val batchInterval = Seconds(2)
        val ssc = new StreamingContext(conf, batchInterval)
        val input: ReceiverInputDStream[String] = ssc.socketTextStream(host, port.toInt,
            storageLevel = StorageLevel.MEMORY_AND_DISK_SER_2)
        val words: DStream[String] = input.flatMap(line => line.split("\\s+"))
        val pairs: DStream[(String, Int)] = words.map(word => (word, 1))
        val ret: DStream[(String, Int)]  = pairs.reduceByKey(_+_)

        ret.print()
        /*
            为了保障流式计算的执行，必须要在编写完transformation和outputs操作调用streamingcontext.start()
            otherwise,程序不会执行
         */
        ssc.start
        println("start之后----termination之前")

        /**
          * IllegalStateException:
          * Adding new inputs, transformations, and output operations after starting a context is not supported
          *
          * 在Stremaing编程中，所有的业务逻辑都必须要在ssc.start之前完成，不能再start之后添加任何的input、tansformation、output操作
          * 这在sparkstreaming中不被支持的，否则就会启动两次streamingContext。
          *
          */
        //ret.print()
        /**
          * 为了让程序持续不断的执行，除了调用start开启执行以外，必须让driver常驻内存，也就是SparkContext在内存中要常驻
          * 如此，才能不断接受receiver发送过来的数据的block地址信息，交由streamingContext拆分成若干batch，其次再通过
          * sparkcontext进行stage阶段划分和task的组装与分发
          */
        ssc.awaitTermination()
        println("----termination之后")
    }
}
